-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Separation of concerns between Shuffle and WorkerShuffle #7195
Conversation
Unfortunately, I haven't fixed |
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 15 files ± 0 15 suites ±0 6h 45m 18s ⏱️ + 38m 49s For more details on these failures, see this check. Results for commit fd15b6f. ± Comparison against base commit 5dccad4. ♻️ This comment has been updated with latest results. |
c6abefc
to
3d547b0
Compare
class ShuffleTestPool: | ||
def __init__(self, *args, **kwargs): | ||
self.shuffles = {} | ||
super().__init__(*args, **kwargs) | ||
|
||
def __call__(self, addr: str, *args: Any, **kwargs: Any) -> PooledRPCShuffle: | ||
return PooledRPCShuffle(self.shuffles[addr]) | ||
|
||
async def fake_broadcast(self, msg): | ||
|
||
op = msg.pop("op").replace("shuffle_", "") | ||
out = {} | ||
for addr, s in self.shuffles.items(): | ||
out[addr] = await getattr(s, op)() | ||
return out | ||
|
||
def new_shuffle( | ||
self, name, worker_for_mapping, schema, directory, loop, Shuffle=Shuffle | ||
): | ||
s = Shuffle( | ||
column="_partition", | ||
worker_for=worker_for_mapping, | ||
# FIXME: Is output_workers redundant with worker_for? | ||
output_workers=set(worker_for_mapping.values()), | ||
schema=schema, | ||
directory=directory / name, | ||
id=ShuffleId(name), | ||
local_address=name, | ||
nthreads=2, | ||
rpc=self, | ||
loop=loop, | ||
broadcast=self.fake_broadcast, | ||
) | ||
self.shuffles[name] = s | ||
return s |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might be the most interesting part of this refactoring. I separated Shuffle
and ShuffleWorkerExtension
sufficiently to allow us to write down tests without scheduler and workers. This allows for a much more granular control over concurrency issues.
Particularly the offload tests below produce race conditions that would drop data
dfs = [] | ||
rows_per_df = 10 | ||
n_input_partitions = 2 | ||
npartitions = 2 | ||
for ix in range(n_input_partitions): | ||
df = pd.DataFrame({"x": range(rows_per_df * ix, rows_per_df * (ix + 1))}) | ||
df["_partition"] = df.x % npartitions | ||
dfs.append(df) | ||
|
||
workers = ["A", "B"] | ||
|
||
worker_for_mapping = {} | ||
partitions_for_worker = defaultdict(list) | ||
|
||
for part in range(npartitions): | ||
worker_for_mapping[part] = w = get_worker_for(part, workers, npartitions) | ||
partitions_for_worker[w].append(part) | ||
schema = pa.Schema.from_pandas(dfs[0]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is all boilerplate I hope we can get rid of eventually. Right now not a prio
rpc: Callable[[str], PooledRPCCall], | ||
broadcast: Callable, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this is the most controversial part. It is very useful, though
I fixed a couple of deadlocks in multicomm an multifile in case of exceptions. I think I took this PR already too far. Will branch of with any following work. This closes #7208 |
closed by #7268 |
Builds on #7186
This PR refactors the Worker extension and the Shuffle class to have stricter separation of concerns.
Specifically,
Shuffle is responsible for all splitting, sending, flushing, receiving, etc. and is sole owner of associated resources (e.g. comms, buffers, threads, background tasks, etc.). It is entirely asynchronous and not intended to be interacted with directly. This is basically agnostic of what a worker even is.
The extension otoh is the interface between worker and the shuffle instance. It routes RPC calls to the shuffle and exposes synchronous methods to be called in the worker thread. It also is responsible to communicate with the scheduler.
Additional changes include some fixes around concurrency. Particularly this should close #6277 (See test_shuffle.py
test_error_offload
andtest_slow_offload
)The gist is that the shuffle/extension did not wait for threads to complete when flushing (or rather it didn't wait for the receives to finish when flushing)